package server

import (
	"context"
	"errors"
	"fmt"
	"gitee.com/zhucheer/orange/logger"
	"gitee.com/zhucheer/pub-connect/app/define"
	pubproto "gitee.com/zhucheer/pub-connect/app/proto"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/keepalive"
	"log"
	"math/rand"
	"net"
	"strings"
	"sync"
	"time"
)

type AppInfo struct {
	Name string
	UUID string
}

type Work struct {
	PriKey       string
	AppName      string // 需要调度的执行器的应用名称
	AppUuid      string
	Name         string // 执行的任务名称
	Uuid         string // 执行任务的uid
	Params       string // 执行任务的参数
	RequestTime  int64  // 提交任务的时间
	ResponseTime int64  // 响应任务的时间
	Status       string // 任务的状态
	Results      string // 查询任务的返回
}

type watcher struct {
	appName string
	appUuid string
	ch      chan pubproto.WorkDetail
}

type AgentServer struct {
	pubproto.UnimplementedPubConnectServer
	DoWorkChan    chan Work // 客户端主动上报消息通道
	workList      map[string]*Work
	watcherGroup  map[string]*watcher
	mutex         sync.Mutex
	RegisterFunDo func(appName, appUuid, secret string) error
	CloserFunDo   func(appName, appUuid string) error
}

type serverWatchStream struct {
	gRPCStream pubproto.PubConnect_DoWorkServer
	closec     chan struct{}
	wg         sync.WaitGroup
	watcher    *watcher
}

// SendWork 向客户端发送消息
func (s *AgentServer) SendWork(priKey, appName, appUuid, name, params string) (actionUuid string) {
	requestTime := time.Now().Unix()
	actionUuid = fmt.Sprintf("%d-%s", requestTime, uuid.New())
	workInfo := &Work{
		PriKey:      priKey,
		AppName:     appName,
		AppUuid:     appUuid,
		Name:        name,
		Uuid:        actionUuid,
		Params:      params,
		RequestTime: requestTime,
		Status:      define.StatusTodo,
	}
	// 防止并发
	s.mutex.Lock()
	defer s.mutex.Unlock()

	// 执行数据方式map
	s.workList[actionUuid] = workInfo

	wa := s.getWatcherByAgentUUID(appUuid)
	if wa == nil {
		s.workList[actionUuid].Status = define.StatusFailed
		s.workList[actionUuid].Results = "agent " + appName + " 非活跃状态"

		return
	}

	fmt.Println("---------callback push work to client--------", workInfo.Name)
	wa.ch <- pubproto.WorkDetail{
		PriKey:      workInfo.PriKey,
		AppName:     workInfo.AppName,
		AppUuid:     workInfo.AppUuid,
		Name:        workInfo.Name,
		Uuid:        workInfo.Uuid,
		Params:      workInfo.Params,
		RequestTime: workInfo.RequestTime,
		Status:      workInfo.Status,
	}
	return actionUuid
}

// GetWatcherItem 获取连接实例
func (s *AgentServer) GetWatcherItem(appName string) (appInfo AppInfo, err error) {

	appList := make([]AppInfo, 0)
	for _, item := range s.watcherGroup {
		if item.appName == appName {
			appList = append(appList, AppInfo{
				item.appName, item.appUuid,
			})
		}
	}
	if len(appList) == 0 {
		return appInfo, errors.New("not fount agent")
	}

	rand.Seed(time.Now().UnixNano())
	rindex := rand.Intn(len(appList))
	appInfo = appList[rindex]

	return appInfo, nil
}

// GetWatcherItem 获取连接实例带包含过滤
func (s *AgentServer) GetWatcherFilterItem(appName, filterContent string) (appInfo AppInfo, err error) {

	appList := make([]AppInfo, 0)
	for _, item := range s.watcherGroup {
		if item.appName == appName && strings.Contains(item.appUuid, filterContent) {
			appList = append(appList, AppInfo{
				item.appName, item.appUuid,
			})
		}
	}
	if len(appList) == 0 {
		return appInfo, errors.New("not fount agent")
	}

	rand.Seed(time.Now().UnixNano())
	rindex := rand.Intn(len(appList))
	appInfo = appList[rindex]

	return appInfo, nil
}

// ConnectAfterDo 创建通道后置操作
func (s *AgentServer) GetWatcherList() (appList []AppInfo) {

	appList = make([]AppInfo, 0)
	for _, item := range s.watcherGroup {
		appList = append(appList, AppInfo{
			item.appName, item.appUuid,
		})
	}

	return appList
}

// 获取下发操作执行结果
func (s *AgentServer) GetWorkResult(uuid string) (Work, error) {
	// 超时
	for i := 0; i < 600; i++ {
		work, b := s.getWorkSave(uuid)
		if b != nil {
			return Work{}, errors.New("work not exist")
		}
		if work.Status == define.StatusSuccess || work.Status == define.StatusFailed {
			return *work, nil
		}
		time.Sleep(100 * time.Millisecond)
	}
	return Work{}, errors.New("grpc time out:" + uuid)
}

// 线程安全方式获取执行结果
func (s *AgentServer) getWorkSave(uid string) (*Work, error) {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	work, b := s.workList[uid]
	if !b {
		return nil, errors.New("work not exist")
	}
	return work, nil
}

// ConnectAfterDo 创建通道后置操作
func (s *AgentServer) ConnectAfterDo(req *pubproto.WorkDetail) {

	fmt.Println("-----------after connect2------------------", req)
}

// Register 服务端方法，注册代理节点消息
func (s *AgentServer) Register(c context.Context, req *pubproto.RegReq) (*pubproto.RegResp, error) {
	// 更具appName 查询agent
	resp := &pubproto.RegResp{}
	fmt.Println("Register Server Rev", req)

	// 执行业务回调方法
	err := s.RegisterFunDo(req.AppName, req.Uuid, req.PriKey)
	if err != nil {
		resp.Status = define.StatusFailed
		resp.Remark = err.Error()
		return resp, err
	}

	resp.Status = define.StatusSuccess
	return resp, nil
}

// DoWork 流式双向数据通道
// 服务端GRPC方法，双向通道核心流程
// 客户端每一次调用该方法将创建一个独立的通信通道sws
func (s *AgentServer) DoWork(stream pubproto.PubConnect_DoWorkServer) error {
	sws := serverWatchStream{
		gRPCStream: stream,
		closec:     make(chan struct{}),
		watcher: &watcher{
			ch: make(chan pubproto.WorkDetail, 20),
		},
	}
	sws.wg.Add(1)
	go func() {
		sws.sendLoop()
		//等loop结束才会走到这里
		sws.wg.Done()
	}()

	// 持续监听客户端回复数据
	errc := make(chan error, 1)
	go func() {
		if rerr := sws.recvLoop(s); rerr != nil {
			errc <- rerr
		}
	}()

	// 遍历workList 处理超时任务
	go func() {
		s.cleanExpireWork()
	}()

	// 处理异常
	select {
	case err := <-errc:
		fmt.Println("this is stream err done")
		if err != AGENT_CREATE_ERR {
			s.cancelWatcher(sws.watcher.appUuid)
		}
	case <-stream.Context().Done():
		s.cancelWatcher(sws.watcher.appUuid)
		fmt.Println("this is stream content done")
	}
	sws.close()
	return nil
}

// AgentServer_CloseWatcher 外部关闭连接
func (s *AgentServer) CloseWatcher(uuid string) {
	s.cancelWatcher(uuid)
}

// cleanExpireWork 处理超时任务
func (s *AgentServer) cleanExpireWork() {
	timer := time.NewTimer(5 * time.Minute)
	for {
		timer.Reset(5 * time.Minute)
		select {
		case <-timer.C:
			nowTs := time.Now().Unix()
			s.mutex.Lock()
			for uuid, v := range s.workList {
				if v.Status == define.StatusDoing && v.RequestTime < nowTs-5*60 {
					v.Status = define.StatusFailed
					v.ResponseTime = nowTs
				}

				// 如果响应超过5分钟则删除这个请求释放内存
				if v.ResponseTime > 0 && v.ResponseTime < nowTs-5*60 {
					delete(s.workList, uuid)
				}
			}
			s.mutex.Unlock()
		}
	}
}

// 根据uuid获取客户端watcher实例
func (s *AgentServer) getWatcherByAgentUUID(uuid string) *watcher {
	for k, w := range s.watcherGroup {
		if k == uuid {
			return w
		}
	}

	return nil
}

func (s *AgentServer) addWatcher(wa *watcher) error {
	wather := s.getWatcherByAgentUUID(wa.appUuid)
	if wather != nil {
		return errors.New("agent already exist")
	}
	s.mutex.Lock()
	defer s.mutex.Unlock()

	s.watcherGroup[wa.appUuid] = wa
	fmt.Println("ADD Watcher APP-UUID:" + wa.appUuid)

	return nil
}

func (s *AgentServer) cancelWatcher(uuid string) {
	fmt.Println("CancelWacher uuid=>" + uuid)
	s.mutex.Lock()
	defer s.mutex.Unlock()

	watcher := s.getWatcherByAgentUUID(uuid)
	if watcher != nil {
		// 客户端流关闭
		s.CloserFunDo(watcher.appName, watcher.appUuid)
	}
	delete(s.watcherGroup, uuid)
}

// NewServer 初始化服务端方法
func NewServer() *AgentServer {
	s := &AgentServer{}
	s.workList = make(map[string]*Work)
	s.watcherGroup = make(map[string]*watcher)
	s.DoWorkChan = make(chan Work, 50)
	return s
}

// StartGRPCServer 启动GRPC服务端
func StartGRPCServer(grpcPort int, s *AgentServer) {
	listen, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	ops := grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionIdle: 5 * time.Minute})
	grpcServer := grpc.NewServer(ops)
	pubproto.RegisterPubConnectServer(grpcServer, s)

	fmt.Println(fmt.Sprintf("PUB-CONNECT START GRPC==>0.0.0.0:%d", grpcPort))
	err = grpcServer.Serve(listen)
	logger.Errorw("GRPC Server Stop", "err", err)
}
